package com.atguigu.flink.datastreamapi.sink;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.function.WaterSensorMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

/**
 * Created by Smexy on 2023/11/13
 *
 oldAPI:  流.addSink(SinkFunction x)，例如print
 newAPI:  流.sinkTo(Sink x),例如KafkaSink


 */
public class Demo1_KafkaSink
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        //构造KafkaSink。 本质是一个生产者
        KafkaSink<String> kafkaSink = KafkaSink
            //IN: 上游发送的类型
            .<String>builder()
            .setBootstrapServers("hadoop102:9092")
            //序列化器  Bean ---->序列化器----->ProduceRecord(byte []K,byte[] V)
            .setRecordSerializer(
                KafkaRecordSerializationSchema.builder()
                                              //只把数据作为value写入
                                              .setValueSerializationSchema(new SimpleStringSchema())
                                              .setTopic("t2")
                                              .build()
            )
            //设置语义保证   EXACTLY_ONCE需要开启checkpoint才能使用
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            //  EXACTLY_ONCE需要，设置写出的事务id的前缀
            //.setTransactionalIdPrefix()
            //设置一些额外生产者的参数配置。参数名ProducerConfig
            .setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "200")
            .setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000")
            .build();

        //读取端口，转换为jsonstr
        env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .map(JSON::toJSONString)
            .sinkTo(kafkaSink);


        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
